Add LangSmith tracing plugin for Temporal workflows#1369
Add LangSmith tracing plugin for Temporal workflows#1369
Conversation
2803b95 to
768ac70
Compare
Implements a LangSmith contrib plugin that creates trace hierarchies for Temporal operations (workflows, activities, signals, queries, updates, child workflows, Nexus). Supports ambient @Traceable context propagation, replay-safe tracing, and an add_temporal_runs toggle for lightweight context-only mode. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
…late - Add ReplaySafeRunTree wrapper that handles replay skipping and sandbox safety (post/end/patch no-op during replay, sandbox_unrestricted in workflow context), inspired by OTel plugin's _ReplaySafeSpan pattern - Add config.maybe_run() to eliminate repeated config kwargs at every call site - Add _traced_call (client outbound) and _traced_outbound (workflow outbound) helpers to reduce interceptor methods to one-liners - Fold _extract_context into _workflow_maybe_run for workflow inbound - Remove _safe_post, _safe_patch helpers (internalized in wrapper) - Remove in_workflow parameter from _maybe_run (wrapper detects it) - Establish consistent wrapping invariant: all run references are ReplaySafeRunTree, unwrapping is unconditional ._run at RunTree constructor boundary - Parametrize redundant unit tests (client outbound, workflow inbound/outbound) and remove duplicate test - Remove _make_interceptor test helper, use LangSmithInterceptor directly - Collapse plugin constructor tests into one, add comprehensive plugin integration test, remove redundant sandbox tests Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Fix ruff I001 import sorting violations in _interceptor.py and test_integration.py. Extract _get_current_run_safe() helper for reading ambient LangSmith context with replay safety. Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Change add_temporal_runs default to False in both plugin and interceptor (reviewer preference for opt-in behavior) - Rename plugin to langchain.LangSmithPlugin per organization.PluginName convention - Prefix header key with _temporal- to avoid collisions - Update all tests to explicitly pass add_temporal_runs=True Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
- Add @Traceable call (outer_chain) directly in ComprehensiveWorkflow to test non-deterministic tracing alongside deterministic replay - Set max_cached_workflows=0 on all test workers to force replay on every workflow task, exposing header non-determinism - Restructure comprehensive tests with mid-workflow worker restart: one shared collector across two worker lifetimes proves context propagates via headers, not cached plugin state - Add is_waiting_for_signal query and poll helper for deterministic sync (no arbitrary sleeps) - Consolidate make_mock_ls_client in conftest.py, remove unused fixtures, use raw client for polling to avoid trace contamination - Tests are expected to fail (TDD): sandbox blocks @Traceable in workflows, max_cached_workflows=0 exposes outputs=None on eviction Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
Move RunTree.post()/patch() I/O off the workflow task thread to a single-worker ThreadPoolExecutor, preventing deadlocks from compressed_traces.lock contention with the LangSmith drain thread. Key changes: - _ReplaySafeRunTree.create_child() override propagates replay safety and deterministic IDs to nested @langsmith.traceable calls - Executor-backed post()/patch() with FIFO ordering and fire-and-forget error logging via Future.add_done_callback - _ContextBridgeRunTree for add_temporal_runs=False without external context — invisible parent that produces root @Traceable runs - aio_to_thread patch simplified: removed harmful replay-time tracing disable, added error gate for async @Traceable without plugin - Plugin shutdown via SimplePlugin.run_context instead of dead method - Fix misleading comments referencing test artifacts instead of production reasons, remove OTel cross-references - Strict dump_runs catches dangling parent_run_id references - Add **/CLAUDE.md to .gitignore Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Replace ~35 Any annotations across _plugin.py and _interceptor.py with precise types (langsmith.Client, RunTree, _ReplaySafeRunTree, specific SDK interceptor input types, etc.). Add _InputWithHeaders Protocol for private helpers matching the OTel interceptor pattern. Narrow return types to match base class signatures exactly. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Prefix unused mock parameters with underscore (_args, _kwargs) and rename unused variable (_collector) to satisfy basedpyright's reportUnusedParameter and reportUnusedVariable checks. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- Remove useless _get_current_run_safe wrapper (inline get_current_run_tree) - Restore generic type params on interceptor return types (ActivityHandle[Any], ChildWorkflowHandle[Any, Any]) to match base class exactly - Fix _make_bridge return type (Any → _ContextBridgeRunTree) - Fix _poll_query helper types (Any → WorkflowHandle, Callable) - Strengthen weak assertions in mixed sync/async integration tests - Add _InputWithHeaders Protocol for private helper input params Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Wrap all 5 activity definitions with @Traceable as outer decorator to test LangSmith tracing through the full activity execution path. Update all 9 expected trace hierarchies to account for the additional @Traceable run nested under each RunActivity. Fix outputs assertion to only check interceptor runs (colon-prefixed names) since @Traceable captures actual return values rather than the interceptor's {'status': 'ok'}. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Bug 1: Replace stale _current_run snapshot with ambient context in outbound interceptor. Add _get_current_run_for_propagation() helper that filters _ContextBridgeRunTree from ambient context. Outbound methods now read get_current_run_tree() for @Traceable nesting instead of a frozen reference from workflow entry. Bug 2: Add tracing_context() to Nexus inbound interceptor for both execute_nexus_operation_start and execute_nexus_operation_cancel, matching the activity inbound pattern. Ensures @Traceable functions in Nexus handlers have a LangSmith client even with add_temporal_runs=False. Remove handler suppression (is_handler check, _workflow_is_active flag) to align with OTel interceptor which creates spans for all handlers unconditionally. Add dump_traces() to test infrastructure for per-root-trace assertions. Restructure comprehensive tests so user_pipeline only wraps start_workflow, with polling/signals/queries as independent root traces. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Built-in queries like __temporal_workflow_metadata, __stack_trace, and __enhanced_stack_trace are fired automatically by infrastructure (e.g. the Temporal Web UI) and are not user-facing. Filter them out of LangSmith traces when add_temporal_runs=True to reduce noise. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
f11f660 to
d9fb85a
Compare
- Replace :class:`RunTree` cross-references with backtick literals in docstrings to fix pydoctor build failure (exit status 3). - Add run ID dedup to InMemoryRunCollector.record_create to match real LangSmith API upsert semantics. Fixes flaky Windows CI failure where combined replay+new-event activations caused duplicate trace records with deterministic IDs. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…fication - Reword sandbox/event loop terminology to use each in correct context - Make _safe_aio_to_thread docstring prescriptive (must not block) - Fix end() to use workflow.now() instead of datetime.now(), remove sandbox_unrestricted() from end() - Remove dead uuid4 try/except in read-only context - Remove redundant lazy import langsmith in __init__ - Improve _ContextBridgeRunTree, ls_client, _traced_outbound docs - Change get_current_run_tree → _get_current_run_for_propagation at call sites that propagate context - Simplify _maybe_run to yield None; callers use ambient context via _get_current_run_for_propagation() instead of the yielded value - Full comment audit: fix stale refs, move misplaced comments Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…s workers Previously, all workers sharing a LangSmithPlugin used the same LangSmithInterceptor (and its ThreadPoolExecutor). Now each worker gets its own interceptor via a factory in configure_worker, while client interception uses a shared wrapper that only implements client.Interceptor to avoid being pulled into workers by _init_from_config. Also removes the sync fallback from _submit (formerly _submit_or_fallback) so executor-after-shutdown errors surface immediately instead of silently degrading. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…eRunTree executor.submit() is not blocked by the workflow sandbox, so the sandbox_unrestricted context manager around _submit calls in post() and patch() was unnecessary. Removes the wrappers and corresponding unit test assertions. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
The old name was misleading — it doesn't bridge contexts. It's a factory that sits in the LangSmith tracing context as a placeholder parent so @Traceable can call create_child(), producing independent root _ReplaySafeRunTree instances with no parent link. Also removes unnecessary sandbox_unrestricted from post/patch since executor.submit() is not blocked by the workflow sandbox. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Rename manually constructed dicts to more descriptive names: - kwargs → run_tree_args (used to build RunTree instances) - ctx_kwargs → tracing_args (used to build tracing_context calls) Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
- _extract_context / _extract_nexus_context now accept ls_client and return fully-formed parents, eliminating 4 call-site fix-ups - Remove unnecessary _ReplaySafeRunTree unwrap in _make_run — RunTree only accesses .id/.dotted_order/.trace_id which delegate transparently - Simplify tracing_args construction by always including project_name and parent (tracing_context treats None same as absent) - Clean up _workflow_maybe_run: eliminate intermediate factory/ tracing_parent variables with single conditional expression Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
…h traces StartFoo completes instantly while RunFoo runs for the operation's lifetime, making the parent-child timing misleading in the UI. Now headers carry the ambient parent's context instead of StartFoo's, so RunFoo nests under the same parent as StartFoo. Adds _traced_start for client outbound start operations (separate from _traced_call used by query/signal/update which keep parent-child). Workflow outbound _traced_outbound captures ambient context before maybe_run for all operations. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Covers quick start, example chatbot, add_temporal_runs toggle, where @Traceable works, migration guide, replay safety, and context propagation. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
| from temporalio.worker.workflow_sandbox import SandboxedWorkflowRunner | ||
|
|
||
|
|
||
| class _ClientOnlyLangSmithInterceptor(temporalio.client.Interceptor): |
There was a problem hiding this comment.
Does this belong in _interceptor.py? Does _interceptor.py need to be broken up into a few files? It's 1k lines long.
There was a problem hiding this comment.
This is a plugin-level wrapper that is used to help with wrangling interceptors so that they properly dedupe when we handle both client and worker interceptors - isn't really an artifact of the real interceptor so I don't think it belongs there.
There was a problem hiding this comment.
Pull request overview
This PR introduces a new temporalio.contrib.langsmith integration that traces Temporal client/worker operations into LangSmith, with replay-safe context propagation through Temporal headers and an add_temporal_runs toggle to include/exclude Temporal operation nodes.
Changes:
- Added
LangSmithPlugin+LangSmithInterceptorto emit LangSmith run hierarchies for workflows, activities, signals/queries/updates, child workflows, and Nexus operations. - Implemented replay-safe tracing via deterministic IDs, workflow-safe time usage, and background-thread I/O for LangSmith
post/patch. - Added extensive unit/integration/E2E tests plus documentation for the new contrib package.
Reviewed changes
Copilot reviewed 10 out of 13 changed files in this pull request and generated 7 comments.
Show a summary per file
| File | Description |
|---|---|
temporalio/contrib/langsmith/_interceptor.py |
Core tracing + propagation logic, replay-safe run wrappers, workflow/activity/client/Nexus interceptors. |
temporalio/contrib/langsmith/_plugin.py |
Plugin wiring, sandbox passthrough configuration, and run context flushing. |
temporalio/contrib/langsmith/__init__.py |
Public exports for the contrib package. |
temporalio/contrib/langsmith/README.md |
User-facing documentation and usage examples for the LangSmith integration. |
tests/contrib/langsmith/conftest.py |
In-memory LangSmith client/run collector helpers for tests. |
tests/contrib/langsmith/test_interceptor.py |
Unit tests for interceptor behavior (propagation, replay safety, toggles, Nexus). |
tests/contrib/langsmith/test_integration.py |
Integration/E2E tests against a real Temporal worker and Nexus operations. |
tests/contrib/langsmith/test_plugin.py |
Plugin construction and end-to-end plugin wiring tests. |
tests/contrib/langsmith/test_background_io.py |
Unit tests for executor-backed post/patch, replay suppression, and factory behavior. |
pyproject.toml |
Adds langsmith to dev dependencies for running tests. |
.gitignore |
Ignores CLAUDE.md files. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| def __init__( | ||
| self, | ||
| *, | ||
| client: langsmith.Client | None = None, | ||
| project_name: str | None = None, | ||
| add_temporal_runs: bool = False, | ||
| default_metadata: dict[str, Any] | None = None, | ||
| default_tags: list[str] | None = None, | ||
| ) -> None: | ||
| """Initialize the LangSmith interceptor with tracing configuration.""" | ||
| super().__init__() | ||
| if client is None: | ||
| client = langsmith.Client() | ||
| self._client = client | ||
| self._project_name = project_name | ||
| self._add_temporal_runs = add_temporal_runs | ||
| self._default_metadata = default_metadata or {} | ||
| self._default_tags = default_tags or [] | ||
| self._executor = ThreadPoolExecutor(max_workers=1) | ||
|
|
There was a problem hiding this comment.
LangSmithInterceptor creates a ThreadPoolExecutor (max_workers=1) but there’s no shutdown path. Since executor threads are typically non-daemon, this can keep processes alive after worker shutdown and leak threads across tests/workers. Consider adding an explicit close/shutdown method on the interceptor and invoking it from the plugin’s run_context (or another worker lifecycle hook) so the executor is always shut down cleanly.
| header = headers.get(HEADER_KEY) | ||
| if not header: | ||
| return None | ||
| ls_headers = _payload_converter.from_payloads([header])[0] | ||
| run = RunTree.from_headers(ls_headers) | ||
| if run is None: | ||
| return None | ||
| run.ls_client = ls_client | ||
| return _ReplaySafeRunTree(run, executor=executor) | ||
|
|
There was a problem hiding this comment.
_extract_context can raise (e.g., payload decode errors or RunTree.from_headers parsing errors) if the header is malformed or from a different version. Since this may run inside workflow code paths, an exception here can fail workflow tasks. Consider wrapping decode/from_headers in try/except and returning None on failure (and avoid logging from workflow sandbox).
There was a problem hiding this comment.
@tconley1428 when parsing a header fails, this is just a telemetry issue so in my mind we just log and continue, can you confirm?
| """Extract LangSmith context from Nexus string headers.""" | ||
| raw = headers.get(HEADER_KEY) | ||
| if not raw: | ||
| return None | ||
| ls_headers = json.loads(raw) | ||
| run = RunTree.from_headers(ls_headers) | ||
| if run is None: | ||
| return None | ||
| run.ls_client = ls_client | ||
| return _ReplaySafeRunTree(run, executor=executor) | ||
|
|
There was a problem hiding this comment.
_extract_nexus_context calls json.loads(raw) without guarding against JSONDecodeError or unexpected types. A malformed header would raise and fail the Nexus handler execution. Consider catching decode/parsing exceptions and returning None (or otherwise treating the header as absent) to keep Nexus operations resilient to header corruption/version skew.
| ## Quick Start | ||
|
|
||
| Register the plugin on your Temporal client. You need it on both the client (starter) side and the workers: | ||
|
|
||
| ```python | ||
| from temporalio.client import Client | ||
| from temporalio.contrib.langsmith import LangSmithPlugin | ||
|
|
||
| client = await Client.connect( | ||
| "localhost:7233", | ||
| plugins=[LangSmithPlugin(project_name="my-project")], | ||
| ) | ||
| ``` |
There was a problem hiding this comment.
The README’s Quick Start doesn’t mention installing the required dependency. Since langsmith is not a core dependency of temporalio, users will need an explicit install step (e.g., pip install temporalio[langsmith] if provided, or pip install langsmith). Adding this avoids import-time surprises when trying to use temporalio.contrib.langsmith.
| "googleapis-common-protos==1.70.0", | ||
| "pytest-rerunfailures>=16.1", | ||
| "moto[s3,server]>=5", | ||
| "langsmith>=0.7.17", | ||
| ] |
There was a problem hiding this comment.
langsmith is added only to the dev dependency group here, but temporalio.contrib.langsmith imports langsmith at runtime. Unless this is intended to be “tests-only”, consider also adding a langsmith = [...] entry under [project.optional-dependencies] so downstream users can install temporalio[langsmith] and packaging metadata reflects the optional runtime dependency.
There was a problem hiding this comment.
+1 here (re: Copilot's comment that this should probably not be a dev dependency), but also, why is this the minimum version supported? Could/should we check for compatibility with older langsmith versions? E.g., 0.7.0 should work per semver? What's our goal or policy for compatibility?
DABH
left a comment
There was a problem hiding this comment.
Not too many comments beyond what's already been said but a couple questions/comments for your consideration
| "googleapis-common-protos==1.70.0", | ||
| "pytest-rerunfailures>=16.1", | ||
| "moto[s3,server]>=5", | ||
| "langsmith>=0.7.17", | ||
| ] |
There was a problem hiding this comment.
+1 here (re: Copilot's comment that this should probably not be a dev dependency), but also, why is this the minimum version supported? Could/should we check for compatibility with older langsmith versions? E.g., 0.7.0 should work per semver? What's our goal or policy for compatibility?
| ls_headers = run_tree.to_headers() | ||
| return { | ||
| **headers, | ||
| HEADER_KEY: _payload_converter.to_payloads([ls_headers])[0], | ||
| } |
There was a problem hiding this comment.
Are there any concerns around header size limits? Can we add a comment or something indicating what the expected size of the header would be here? I know Temporal has some header/payload size limits
| @@ -0,0 +1,113 @@ | |||
| """Shared test helpers for LangSmith plugin tests.""" | |||
There was a problem hiding this comment.
Can we add test(s) when an Activity raises an exception? (Does the LangSmith run get properly ended with error status?)
What about workflow cancellation or timeout? These are important for production use since error traces are often the most valuable ones in LangSmith.
There was a problem hiding this comment.
See ActivityFailureWorkflow in test_integration.py for failed activity. Will add ones for workflow cancellation/timeout, as well as activity timeout/any other errors AI can come up with.
| from temporalio.contrib.langsmith._plugin import LangSmithPlugin | ||
|
|
||
| __all__ = [ | ||
| "LangSmithInterceptor", |
There was a problem hiding this comment.
Will users directly create / interface with LangSmithInterceptors? Is it necessary to publicly expose this?
Previously, when client=None, each make_interceptor() call created a new langsmith.Client. This meant per-worker clients were never flushed. Now a single client is created eagerly in __init__ and shared via the make_interceptor closure. Also fix WorkerConfig import path for basedpyright. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
| _aio_to_thread_patched = False | ||
|
|
||
|
|
||
| def _patch_aio_to_thread() -> None: |
There was a problem hiding this comment.
Open question: This monkey patch is pretty benign/safe, since it mimics default behavior when outside of a workflow, but it is still a monkey patch. I wonder if Langchain would be able to expose some kind of official API/method for us to customize the default async->sync transition rather than us needing to rely on patching langsmith's internal code?
There was a problem hiding this comment.
Also to add more clarity to what this is doing. The doc comment reads Functions passed here must not perform blocking I/O. Normally, the post/patch operations which are passed into this function do perform I/O; however the integration wraps those implementations with replay-safe wrappers, which allow them to be placed on a separate ThreadPoolExecutor via this function. (So no blocking happens here by design.) See _ReplaySafeRunTree::_submit function in this file.
Add langsmith>=0.7.0 to [project.optional-dependencies] so users can install via pip install temporalio[langsmith]. Add Installation section to the LangSmith plugin README. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
Summary
temporalio.contrib.langsmithplugin that creates LangSmith trace hierarchies for Temporal operations (workflows, activities, signals, queries, updates, child workflows, Nexus)@traceablecontext propagation through Temporal headers, replay-safe tracing, and anadd_temporal_runstoggle for lightweight context-only mode🤖 Generated with Claude Code